| 
									
										
										
										
											2019-08-30 22:42:08 +01:00
										 |  |  | <?php | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | use Predis\Client; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class RedisQueueManager extends QueueManager | 
					
						
							|  |  |  | { | 
					
						
							|  |  |  |     protected $queue; | 
					
						
							|  |  |  |     protected $client = null; | 
					
						
							|  |  |  |     protected $server; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     public function __construct(string $server) | 
					
						
							|  |  |  |     { | 
					
						
							| 
									
										
										
										
											2019-08-31 23:53:01 +01:00
										 |  |  |         parent::__construct(); | 
					
						
							| 
									
										
										
										
											2019-08-30 22:42:08 +01:00
										 |  |  |         $this->server = $server; | 
					
						
							|  |  |  |         $this->queue = 'gnusocial:' . common_config('site', 'name'); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     private function _ensureConn() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         if ($this->client === null) { | 
					
						
							|  |  |  |             $this->client = new Client($this->server); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     public function pollInterval() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         return 10; | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     public function enqueue($object, $queue) | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         $this->_ensureConn(); | 
					
						
							|  |  |  |         $ret = $this->client->rpush($this->queue, $this->encode([$queue, $object])); | 
					
						
							|  |  |  |         if (empty($ret)) { | 
					
						
							|  |  |  |             common_log(LOG_ERR, "Unable to insert object into Redis queue {$queue}"); | 
					
						
							|  |  |  |         } else { | 
					
						
							|  |  |  |             common_debug("The Redis queue for {$queue} has length {$ret}"); | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     public function poll() | 
					
						
							|  |  |  |     { | 
					
						
							|  |  |  |         try { | 
					
						
							|  |  |  |             $this->_ensureConn(); | 
					
						
							|  |  |  |             $ret = $this->client->lpop($this->queue); | 
					
						
							|  |  |  |             if (!empty($ret)) { | 
					
						
							|  |  |  |                 list($queue, $object) = $this->decode($ret); | 
					
						
							|  |  |  |             } else { | 
					
						
							|  |  |  |                 return false; | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         } catch (Exception $e) { | 
					
						
							|  |  |  |             $this->_log(LOG_INFO, "[Queue {$queue}] Discarding: " . _ve($e->getMessage())); | 
					
						
							|  |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try { | 
					
						
							|  |  |  |             $handler = $this->getHandler($queue); | 
					
						
							|  |  |  |             $handler->handle($object); | 
					
						
							|  |  |  |             common_debug("Redis Queue handled item from {$queue} queue"); | 
					
						
							|  |  |  |             return true; | 
					
						
							|  |  |  |         } catch (Exception $e) { | 
					
						
							|  |  |  |             $this->_log(LOG_ERR, "[Queue: {$queue}] `" . get_class($e) . '` thrown: ' . _ve($e->getMessage())); | 
					
						
							|  |  |  |             return false; | 
					
						
							|  |  |  |         } | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | }; |