| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | <?php | 
					
						
							|  |  |  | /** | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * StatusNet, the distributed open-source microblogging tool | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  * | 
					
						
							|  |  |  |  * Simple-minded queue manager for storing items in the database | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * PHP version 5 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * LICENCE: This program is free software: you can redistribute it and/or modify | 
					
						
							|  |  |  |  * it under the terms of the GNU Affero General Public License as published by | 
					
						
							|  |  |  |  * the Free Software Foundation, either version 3 of the License, or | 
					
						
							|  |  |  |  * (at your option) any later version. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * This program is distributed in the hope that it will be useful, | 
					
						
							|  |  |  |  * but WITHOUT ANY WARRANTY; without even the implied warranty of | 
					
						
							|  |  |  |  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
					
						
							|  |  |  |  * GNU Affero General Public License for more details. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * You should have received a copy of the GNU Affero General Public License | 
					
						
							|  |  |  |  * along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * @category  QueueManager | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * @package   StatusNet | 
					
						
							| 
									
										
										
										
											2009-08-25 18:19:04 -04:00
										 |  |  |  * @author    Evan Prodromou <evan@status.net> | 
					
						
							| 
									
										
										
										
											2009-08-25 18:12:20 -04:00
										 |  |  |  * @copyright 2009 StatusNet, Inc. | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 | 
					
						
							| 
									
										
										
										
											2009-08-25 18:16:46 -04:00
										 |  |  |  * @link      http://status.net/ | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class DBQueueManager extends QueueManager | 
					
						
							|  |  |  | { | 
					
						
							|  |  |  |     var $qis = array(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     function enqueue($object, $queue) | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2009-07-01 11:09:42 -04:00
										 |  |  |         $notice = $object; | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         $qi = new Queue_item(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $qi->notice_id = $notice->id; | 
					
						
							|  |  |  |         $qi->transport = $queue; | 
					
						
							|  |  |  |         $qi->created   = $notice->created; | 
					
						
							|  |  |  |         $result        = $qi->insert(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if (!$result) { | 
					
						
							|  |  |  |             common_log_db_error($qi, 'INSERT', __FILE__); | 
					
						
							|  |  |  |             throw new ServerException('DB error inserting queue item'); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return true; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-04 00:31:28 -04:00
										 |  |  |     function service($queue, $handler) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         while (true) { | 
					
						
							|  |  |  |             $this->_log(LOG_DEBUG, 'Checking for notices...'); | 
					
						
							| 
									
										
										
										
											2009-07-21 12:50:32 -07:00
										 |  |  |             $timeout = $handler->timeout(); | 
					
						
							|  |  |  |             $notice = $this->_nextItem($queue, $timeout); | 
					
						
							| 
									
										
										
										
											2009-07-04 00:31:28 -04:00
										 |  |  |             if (empty($notice)) { | 
					
						
							|  |  |  |                 $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); | 
					
						
							|  |  |  |                 // Nothing in the queue. Do you
 | 
					
						
							|  |  |  |                 // have other tasks, like servicing your
 | 
					
						
							|  |  |  |                 // XMPP connection, to do?
 | 
					
						
							|  |  |  |                 $handler->idle(QUEUE_HANDLER_MISS_IDLE); | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 $this->_log(LOG_INFO, 'Got notice '. $notice->id); | 
					
						
							|  |  |  |                 // Yay! Got one!
 | 
					
						
							|  |  |  |                 if ($handler->handle_notice($notice)) { | 
					
						
							|  |  |  |                     $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); | 
					
						
							|  |  |  |                     $this->_done($notice, $queue); | 
					
						
							|  |  |  |                 } else { | 
					
						
							|  |  |  |                     $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); | 
					
						
							|  |  |  |                     $this->_fail($notice, $queue); | 
					
						
							|  |  |  |                 } | 
					
						
							|  |  |  |                 // Chance to e.g. service your XMPP connection
 | 
					
						
							|  |  |  |                 $this->_log(LOG_DEBUG, 'Idling after success.'); | 
					
						
							|  |  |  |                 $handler->idle(QUEUE_HANDLER_HIT_IDLE); | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |             // XXX: when do we give up?
 | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     function _nextItem($queue, $timeout=null) | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							|  |  |  |         $start = time(); | 
					
						
							|  |  |  |         $result = null; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-23 14:56:21 -07:00
										 |  |  |         $sleeptime = 1; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |         do { | 
					
						
							|  |  |  |             $qi = Queue_item::top($queue); | 
					
						
							| 
									
										
										
										
											2009-07-22 09:25:27 +01:00
										 |  |  |             if (empty($qi)) { | 
					
						
							| 
									
										
										
										
											2009-07-23 14:56:21 -07:00
										 |  |  |                 $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds."); | 
					
						
							|  |  |  |                 sleep($sleeptime); | 
					
						
							|  |  |  |                 $sleeptime *= 2; | 
					
						
							| 
									
										
										
										
											2009-07-22 09:25:27 +01:00
										 |  |  |             } else { | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |                 $notice = Notice::staticGet('id', $qi->notice_id); | 
					
						
							|  |  |  |                 if (!empty($notice)) { | 
					
						
							|  |  |  |                     $result = $notice; | 
					
						
							|  |  |  |                 } else { | 
					
						
							|  |  |  |                     $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); | 
					
						
							|  |  |  |                     $qi->delete(); | 
					
						
							|  |  |  |                     $qi->free(); | 
					
						
							|  |  |  |                     $qi = null; | 
					
						
							|  |  |  |                 } | 
					
						
							| 
									
										
										
										
											2009-07-23 14:56:21 -07:00
										 |  |  |                 $sleeptime = 1; | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |             } | 
					
						
							|  |  |  |         } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return $result; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-04 00:31:28 -04:00
										 |  |  |     function _done($object, $queue) | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |     { | 
					
						
							| 
									
										
										
										
											2009-07-01 11:09:42 -04:00
										 |  |  |         // XXX: right now, we only handle notices
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $notice = $object; | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, | 
					
						
							|  |  |  |                                         'transport' => $queue)); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if (empty($qi)) { | 
					
						
							| 
									
										
										
										
											2009-07-01 12:10:25 -04:00
										 |  |  |             $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |         } else { | 
					
						
							|  |  |  |             if (empty($qi->claimed)) { | 
					
						
							| 
									
										
										
										
											2009-07-01 12:10:25 -04:00
										 |  |  |                 $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. | 
					
						
							| 
									
										
										
										
											2009-07-23 14:50:47 -07:00
										 |  |  |                             'for '.$notice->id.', queue '.$queue); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  |             } | 
					
						
							|  |  |  |             $qi->delete(); | 
					
						
							|  |  |  |             $qi->free(); | 
					
						
							|  |  |  |             $qi = null; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-01 12:10:25 -04:00
										 |  |  |         $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $notice->free(); | 
					
						
							|  |  |  |         $notice = null; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2009-07-04 00:31:28 -04:00
										 |  |  |     function _fail($object, $queue) | 
					
						
							| 
									
										
										
										
											2009-07-01 12:10:25 -04:00
										 |  |  |     { | 
					
						
							|  |  |  |         // XXX: right now, we only handle notices
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $notice = $object; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, | 
					
						
							|  |  |  |                                         'transport' => $queue)); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if (empty($qi)) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             if (empty($qi->claimed)) { | 
					
						
							|  |  |  |                 $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. | 
					
						
							| 
									
										
										
										
											2009-07-23 14:50:47 -07:00
										 |  |  |                             'for '.$notice->id.', queue '.$queue); | 
					
						
							| 
									
										
										
										
											2009-07-01 12:10:25 -04:00
										 |  |  |             } else { | 
					
						
							|  |  |  |                 $orig = clone($qi); | 
					
						
							|  |  |  |                 $qi->claimed = null; | 
					
						
							|  |  |  |                 $qi->update($orig); | 
					
						
							|  |  |  |                 $qi = null; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); | 
					
						
							| 
									
										
										
										
											2009-06-28 14:38:31 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |         $notice->free(); | 
					
						
							|  |  |  |         $notice = null; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     function _log($level, $msg) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         common_log($level, 'DBQueueManager: '.$msg); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | } |