forked from GNUsocial/gnu-social
		
	
		
			
	
	
		
			211 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
		
		
			
		
	
	
			211 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| 
								 | 
							
								<?php
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								declare(strict_types=1);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								namespace Enqueue\Stomp;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								use Interop\Queue\Consumer;
							 | 
						||
| 
								 | 
							
								use Interop\Queue\Exception\InvalidMessageException;
							 | 
						||
| 
								 | 
							
								use Interop\Queue\Message;
							 | 
						||
| 
								 | 
							
								use Interop\Queue\Queue;
							 | 
						||
| 
								 | 
							
								use Stomp\Client;
							 | 
						||
| 
								 | 
							
								use Stomp\Transport\Frame;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class StompConsumer implements Consumer
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    const ACK_AUTO = 'auto';
							 | 
						||
| 
								 | 
							
								    const ACK_CLIENT = 'client';
							 | 
						||
| 
								 | 
							
								    const ACK_CLIENT_INDIVIDUAL = 'client-individual';
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var StompDestination
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private $queue;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var Client
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private $stomp;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var bool
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private $isSubscribed;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var string
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private $ackMode;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var int
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private $prefetchCount;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var string
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private $subscriptionId;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function __construct(BufferedStompClient $stomp, StompDestination $queue)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->stomp = $stomp;
							 | 
						||
| 
								 | 
							
								        $this->queue = $queue;
							 | 
						||
| 
								 | 
							
								        $this->isSubscribed = false;
							 | 
						||
| 
								 | 
							
								        $this->ackMode = self::ACK_CLIENT_INDIVIDUAL;
							 | 
						||
| 
								 | 
							
								        $this->prefetchCount = 1;
							 | 
						||
| 
								 | 
							
								        $this->subscriptionId = StompDestination::TYPE_TEMP_QUEUE == $queue->getType() ?
							 | 
						||
| 
								 | 
							
								            $queue->getQueueName() :
							 | 
						||
| 
								 | 
							
								            uniqid('', true)
							 | 
						||
| 
								 | 
							
								        ;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function setAckMode(string $mode): void
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if (false === in_array($mode, [self::ACK_AUTO, self::ACK_CLIENT, self::ACK_CLIENT_INDIVIDUAL], true)) {
							 | 
						||
| 
								 | 
							
								            throw new \LogicException(sprintf('Ack mode is not valid: "%s"', $mode));
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->ackMode = $mode;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function getAckMode(): string
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        return $this->ackMode;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function getPrefetchCount(): int
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        return $this->prefetchCount;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function setPrefetchCount(int $prefetchCount): void
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->prefetchCount = $prefetchCount;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @return StompDestination
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function getQueue(): Queue
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        return $this->queue;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function receive(int $timeout = 0): ?Message
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->subscribe();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if (0 === $timeout) {
							 | 
						||
| 
								 | 
							
								            while (true) {
							 | 
						||
| 
								 | 
							
								                if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
							 | 
						||
| 
								 | 
							
								                    return $this->convertMessage($message);
							 | 
						||
| 
								 | 
							
								                }
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
							 | 
						||
| 
								 | 
							
								                return $this->convertMessage($message);
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return null;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    public function receiveNoWait(): ?Message
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $this->subscribe();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 0)) {
							 | 
						||
| 
								 | 
							
								            return $this->convertMessage($message);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return null;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @param StompMessage $message
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function acknowledge(Message $message): void
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->stomp->sendFrame(
							 | 
						||
| 
								 | 
							
								            $this->stomp->getProtocol()->getAckFrame($message->getFrame())
							 | 
						||
| 
								 | 
							
								        );
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @param StompMessage $message
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function reject(Message $message, bool $requeue = false): void
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        // rabbitmq STOMP protocol extension
							 | 
						||
| 
								 | 
							
								        $nackFrame->addHeaders([
							 | 
						||
| 
								 | 
							
								            'requeue' => $requeue ? 'true' : 'false',
							 | 
						||
| 
								 | 
							
								        ]);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->stomp->sendFrame($nackFrame);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    private function subscribe(): void
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if (StompDestination::TYPE_TEMP_QUEUE == $this->queue->getType()) {
							 | 
						||
| 
								 | 
							
								            $this->isSubscribed = true;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            return;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if (false == $this->isSubscribed) {
							 | 
						||
| 
								 | 
							
								            $this->isSubscribed = true;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $frame = $this->stomp->getProtocol()->getSubscribeFrame(
							 | 
						||
| 
								 | 
							
								                $this->queue->getQueueName(),
							 | 
						||
| 
								 | 
							
								                $this->subscriptionId,
							 | 
						||
| 
								 | 
							
								                $this->ackMode
							 | 
						||
| 
								 | 
							
								            );
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            // rabbitmq STOMP protocol extension
							 | 
						||
| 
								 | 
							
								            $headers = $this->queue->getHeaders();
							 | 
						||
| 
								 | 
							
								            $headers['prefetch-count'] = $this->prefetchCount;
							 | 
						||
| 
								 | 
							
								            $headers = StompHeadersEncoder::encode($headers);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            foreach ($headers as $key => $value) {
							 | 
						||
| 
								 | 
							
								                $frame[$key] = $value;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->stomp->sendFrame($frame);
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    private function convertMessage(Frame $frame): StompMessage
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if ('MESSAGE' !== $frame->getCommand()) {
							 | 
						||
| 
								 | 
							
								            throw new \LogicException(sprintf('Frame is not MESSAGE frame but: "%s"', $frame->getCommand()));
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        list($headers, $properties) = StompHeadersEncoder::decode($frame->getHeaders());
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $redelivered = isset($headers['redelivered']) && 'true' === $headers['redelivered'];
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        unset(
							 | 
						||
| 
								 | 
							
								            $headers['redelivered'],
							 | 
						||
| 
								 | 
							
								            $headers['destination'],
							 | 
						||
| 
								 | 
							
								            $headers['message-id'],
							 | 
						||
| 
								 | 
							
								            $headers['ack'],
							 | 
						||
| 
								 | 
							
								            $headers['receipt'],
							 | 
						||
| 
								 | 
							
								            $headers['subscription'],
							 | 
						||
| 
								 | 
							
								            $headers['content-length']
							 | 
						||
| 
								 | 
							
								        );
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $message = new StompMessage((string) $frame->getBody(), $properties, $headers);
							 | 
						||
| 
								 | 
							
								        $message->setRedelivered($redelivered);
							 | 
						||
| 
								 | 
							
								        $message->setFrame($frame);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return $message;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								}
							 |